本文为您介绍如何使用Flume同步EMR Kafka集群的数据至阿里云OSS-HDFS(JindoFS服务)。

背景信息

OSS-HDFS服务是一款云原生数据湖存储产品,基于统一的元数据管理能力,在完全兼容HDFS文件系统接口的同时,提供充分的POSIX能力支持,能更好的满足大数据和AI领域丰富多样的数据湖计算场景,详细信息请参见OSS-HDFS服务概述

前提条件

操作步骤

  1. 开启OSS-HDFS。

    开通并授权访问OSS-HDFS服务,具体操作请参见开通并授权访问OSS-HDFS服务

  2. 配置Flume。
    1. 进入Flume的配置页面。
      1. 登录EMR on ECS控制台
      2. 在顶部菜单栏处,根据实际情况选择地域和资源组
      3. 集群管理页面,单击目标集群操作列的集群服务
      4. 集群服务页面,单击FLUME服务区域的配置
    2. 设置JVM最大可用内存(Xmx)。
      Flume向OSS写入数据时,因为需要占用较大的JVM内存,所以可以增大Flume Agent的Xmx。
      1. 单击flume-env.sh页签。

        本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置

      2. 修改JAVA_OPTS的参数值。

        例如,设置为1g,则参数值修改为-Xmx1g。

    3. 单击flume-conf.properties页签。
      本文示例采用的是全局配置方式,如果您想按照节点配置,可以在FLUME服务配置页面的下拉列表中选择独立节点配置
    4. flume-conf.properties的参数值中,添加以下内容。
      说明 代码示例中的default-agent,请与FLUME服务配置页面的agent_name参数的参数值保持一致。
      default-agent.sources = source1
      default-agent.sinks = k1
      default-agent.channels = c1
      
      default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      default-agent.sources.source1.channels = c1
      default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...>
      default-agent.sources.source1.kafka.topics = flume-test
      default-agent.sources.source1.kafka.consumer.group.id = flume-test-group
      
      default-agent.sinks.k1.type = hdfs
      default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
      default-agent.sinks.k1.hdfs.fileType=DataStream
      
      # Use a channel which buffers events in memory
      default-agent.channels.c1.type = memory
      default-agent.channels.c1.capacity = 100
      default-agent.channels.c1.transactionCapacity = 100
      
      # Bind the source and sink to the channel
      default-agent.sources.source1.channels = c1
      default-agent.sinks.k1.channel = c1
      参数 描述
      default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口号。
      default-agent.sinks.k1.hdfs.path OSS-HDFS的路径。填写格式为oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>

      例如,oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result。

      说明 <examplebucket>为Bucket名称,<exampleregion>为地域ID,<exampledir>为OSS上的目录名。
      default-agent.channels.c1.capacity 通道中存储的最大事件数。请根据实际环境修改该参数值。
      default-agent.channels.c1.transactionCapacity 每个事务通道将从源接收或提供给接收器的最大事件数。请根据实际环境修改该参数值。
  3. 测试数据同步情况。
    1. 通过SSH方式连接DataFlow集群,详情请参见登录集群
    2. 创建名称为flume-test的Topic。
      kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
    3. 生成测试数据。
      kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092

      例如输入abc并回车。

    4. oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路径下会以当前时间的时间戳(毫秒)为后缀生成文件FlumeData.xxxx